global $wgUpdateRowsPerQuery;
$dbw = $this->getConnection( DB_MASTER );
+ $factory = wfGetLBFactory();
$watchersChunks = array_chunk( $watchers, $wgUpdateRowsPerQuery );
foreach ( $watchersChunks as $watchersChunk ) {
], $fname
);
if ( count( $watchersChunks ) > 1 ) {
- $dbw->commit( __METHOD__, 'flush' );
- wfGetLBFactory()->waitForReplication( [ 'wiki' => $dbw->getWikiID() ] );
+ $factory->commitMasterChanges( __METHOD__ );
+ $factory->waitForReplication( [ 'wiki' => $dbw->getWikiID() ] );
}
}
$this->uncacheLinkTarget( $target );
" performing implicit commit before closing connection!" );
}
- $this->commit( __METHOD__, 'flush' );
+ $this->commit( __METHOD__, self::FLUSHING_INTERNAL );
}
$closed = $this->closeConnection();
}
if ( !$this->mTrxAtomicLevels && $this->mTrxAutomaticAtomic ) {
- $this->commit( $fname, 'flush' );
+ $this->commit( $fname, self::FLUSHING_INTERNAL );
}
}
);
}
- if ( $flush === 'flush' ) {
+ if ( $flush === self::FLUSHING_INTERNAL || $flush === self::FLUSHING_ALL_PEERS ) {
if ( !$this->mTrxLevel ) {
return; // nothing to do
} elseif ( !$this->mTrxAutomatic ) {
}
final public function rollback( $fname = __METHOD__, $flush = '' ) {
- if ( $flush !== 'flush' ) {
+ if ( $flush !== self::FLUSHING_INTERNAL && $flush !== self::FLUSHING_ALL_PEERS ) {
if ( !$this->mTrxLevel ) {
wfWarn( "$fname: No transaction to rollback, something got out of sync!" );
return; // nothing to do
}
$unlocker = new ScopedCallback( function () use ( $lockKey, $fname ) {
- $this->commit( __METHOD__, 'flush' );
+ $this->commit( __METHOD__, self::FLUSHING_INTERNAL );
$this->unlock( $lockKey, $fname );
} );
- $this->commit( __METHOD__, 'flush' );
+ $this->commit( __METHOD__, self::FLUSHING_INTERNAL );
return $unlocker;
}
return 0; // already reached this point for sure
}
- // Commit any open transactions
- $this->commit( __METHOD__, 'flush' );
-
// Call doQuery() directly, to avoid opening a transaction if DBO_TRX is set
if ( $this->useGTIDs && $pos->gtids ) {
// Wait on the GTID set (MariaDB only)
* @ingroup Database
*/
interface IDatabase {
- /* Constants to onTransactionResolution() callbacks */
+ /** @var int Callback triggered immediately due to no active transaction */
const TRIGGER_IDLE = 1;
+ /** @var int Callback triggered by commit */
const TRIGGER_COMMIT = 2;
+ /** @var int Callback triggered by rollback */
const TRIGGER_ROLLBACK = 3;
+ /** @var string Transaction operation comes from service managing all DBs */
+ const FLUSHING_ALL_PEERS = 'flush';
+ /** @var string Transaction operation comes from the database class internally */
+ const FLUSHING_INTERNAL = 'flush';
+
/**
* A string describing the current software version, and possibly
* other details in a user-friendly way. Will be listed on Special:Version, etc.
* Nesting of transactions is not supported.
*
* @param string $fname
- * @param string $flush Flush flag, set to 'flush' to disable warnings about
- * explicitly committing implicit transactions, or calling commit when no
- * transaction is in progress.
+ * @param string $flush Flush flag, set to situationally valid IDatabase::FLUSHING_*
+ * constant to disable warnings about explicitly committing implicit transactions,
+ * or calling commit when no transaction is in progress.
*
* This will trigger an exception if there is an ongoing explicit transaction.
*
* No-op on non-transactional databases.
*
* @param string $fname
- * @param string $flush Flush flag, set to 'flush' to disable warnings about
- * calling rollback when no transaction is in progress. This will silently
- * break any ongoing explicit transaction. Only set the flush flag if you
- * are sure that it is safe to ignore these warnings in your context.
+ * @param string $flush Flush flag, set to a situationally valid IDatabase::FLUSHING_*
+ * constant to disable warnings about calling rollback when no transaction is in
+ * progress. This will silently break any ongoing explicit transaction. Only set the
+ * flush flag if you are sure that it is safe to ignore these warnings in your context.
* @throws DBUnexpectedError
* @since 1.23 Added $flush parameter
*/
*/
public function commitAll( $fname = __METHOD__ ) {
$this->forEachOpenConnection( function ( DatabaseBase $conn ) use ( $fname ) {
- $conn->commit( $fname, 'flush' );
+ $conn->commit( $fname, IDatabase::FLUSHING_ALL_PEERS );
} );
}
public function commitMasterChanges( $fname = __METHOD__ ) {
$this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( $fname ) {
if ( $conn->writesOrCallbacksPending() ) {
- $conn->commit( $fname, 'flush' );
+ $conn->commit( $fname, IDatabase::FLUSHING_ALL_PEERS );
}
} );
}
foreach ( $conns2[$masterIndex] as $conn ) {
if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) {
try {
- $conn->rollback( $fname, 'flush' );
+ $conn->rollback( $fname, IDatabase::FLUSHING_ALL_PEERS );
} catch ( DBError $e ) {
MWExceptionHandler::logException( $e );
$failedServers[] = $conn->getServer();
public function doUpdate() {
$config = RequestContext::getMain()->getConfig();
$batchSize = $config->get( 'UpdateRowsPerQuery' );
+ $factory = wfGetLBFactory();
// Page may already be deleted, so don't just getId()
$id = $this->pageId;
foreach ( $catBatches as $catBatch ) {
$this->page->updateCategoryCounts( [], $catBatch, $id );
if ( count( $catBatches ) > 1 ) {
- $this->mDb->commit( __METHOD__, 'flush' );
- wfGetLBFactory()->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] );
+ $factory->commitMasterChanges( __METHOD__ );
+ $factory->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] );
}
}
foreach ( $rcIdBatches as $rcIdBatch ) {
$this->mDb->delete( 'recentchanges', [ 'rc_id' => $rcIdBatch ], __METHOD__ );
if ( count( $rcIdBatches ) > 1 ) {
- $this->mDb->commit( __METHOD__, 'flush' );
- wfGetLBFactory()->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] );
+ $factory->commitMasterChanges( __METHOD__ );
+ $factory->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] );
}
}
}
private function batchDeleteByPK( $table, array $conds, array $pk, $bSize ) {
$dbw = $this->mDb; // convenience
+ $factory = wfGetLBFactory();
$res = $dbw->select( $table, $pk, $conds, __METHOD__ );
$pkDeleteConds = [];
$pkDeleteConds[] = $this->mDb->makeList( (array)$row, LIST_AND );
if ( count( $pkDeleteConds ) >= $bSize ) {
$dbw->delete( $table, $dbw->makeList( $pkDeleteConds, LIST_OR ), __METHOD__ );
- $dbw->commit( __METHOD__, 'flush' );
- wfGetLBFactory()->waitForReplication( [ 'wiki' => $dbw->getWikiID() ] );
+ $factory->commitMasterChanges( __METHOD__ );
+ $factory->waitForReplication( [ 'wiki' => $dbw->getWikiID() ] );
$pkDeleteConds = [];
}
}
*/
private function incrTableUpdate( $table, $prefix, $deletions, $insertions ) {
$bSize = RequestContext::getMain()->getConfig()->get( 'UpdateRowsPerQuery' );
+ $factory = wfGetLBFactory();
if ( $table === 'page_props' ) {
$fromField = 'pp_page';
foreach ( $deleteWheres as $deleteWhere ) {
$this->mDb->delete( $table, $deleteWhere, __METHOD__ );
- $this->mDb->commit( __METHOD__, 'flush' );
- wfGetLBFactory()->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] );
+ $factory->commitMasterChanges( __METHOD__ );
+ $factory->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] );
}
$insertBatches = array_chunk( $insertions, $bSize );
foreach ( $insertBatches as $insertBatch ) {
$this->mDb->insert( $table, $insertBatch, __METHOD__, 'IGNORE' );
- $this->mDb->commit( __METHOD__, 'flush' );
- wfGetLBFactory()->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] );
+ $factory->commitMasterChanges( __METHOD__ );
+ $factory->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] );
}
if ( count( $insertions ) ) {
return false;
}
// Clear any stale REPEATABLE-READ snapshot
- $dbr->commit( __METHOD__, 'flush' );
+ wfGetLBFactory()->commitAll( __METHOD__ );
$cutoffUnix = wfTimestamp( TS_UNIX, $this->params['revTimestamp'] );
// Using ENQUEUE_FUDGE_SEC handles jobs inserted out of revision order due to the delay
}
$dbw = wfGetDB( DB_MASTER );
+ $factory = wfGetLBFactory();
$catMembChange = new CategoryMembershipChange( $title, $newRev );
$catMembChange->checkTemplateLinks();
$categoryTitle = Title::makeTitle( NS_CATEGORY, $categoryName );
$catMembChange->triggerCategoryAddedNotification( $categoryTitle );
if ( $insertCount++ && ( $insertCount % $batchSize ) == 0 ) {
- $dbw->commit( __METHOD__, 'flush' );
- wfGetLBFactory()->waitForReplication();
+ $factory->commitMasterChanges( __METHOD__ );
+ $factory->waitForReplication();
}
}
$categoryTitle = Title::makeTitle( NS_CATEGORY, $categoryName );
$catMembChange->triggerCategoryRemovedNotification( $categoryTitle );
if ( $insertCount++ && ( $insertCount++ % $batchSize ) == 0 ) {
- $dbw->commit( __METHOD__, 'flush' );
- wfGetLBFactory()->waitForReplication();
+ $factory->commitMasterChanges( __METHOD__ );
+ $factory->waitForReplication();
}
}
}
$touchTimestamp = wfTimestampNow();
$dbw = wfGetDB( DB_MASTER );
+ $factory = wfGetLBFactory();
// Update page_touched (skipping pages already touched since the root job).
// Check $wgUpdateRowsPerQuery for sanity; batch jobs are sized by that already.
foreach ( array_chunk( $pageIds, $wgUpdateRowsPerQuery ) as $batch ) {
- $dbw->commit( __METHOD__, 'flush' );
- wfGetLBFactory()->waitForReplication();
+ $factory->commitMasterChanges( __METHOD__ );
+ $factory->waitForReplication();
$dbw->update( 'page',
[ 'page_touched' => $dbw->timestamp( $touchTimestamp ) ],
return; // already in progress
}
+ $factory = wfGetLBFactory();
$cutoff = $dbw->timestamp( time() - $wgRCMaxAge );
do {
$rcIds = $dbw->selectFieldValues( 'recentchanges',
$dbw->delete( 'recentchanges', [ 'rc_id' => $rcIds ], __METHOD__ );
}
// Commit in chunks to avoid slave lag
- $dbw->commit( __METHOD__, 'flush' );
+ $factory->commitMasterChanges( __METHOD__ );
if ( count( $rcIds ) === $wgUpdateRowsPerQuery ) {
// There might be more, so try waiting for slaves
$this->getOffset() . ' inx:' . $this->getChunkIndex() . "\n" );
$dbw = $this->repo->getMasterDB();
- // Use a quick transaction since we will upload the full temp file into shared
- // storage, which takes time for large files. We don't want to hold locks then.
$dbw->update(
'uploadstash',
[
[ 'us_key' => $this->mFileKey ],
__METHOD__
);
- $dbw->commit( __METHOD__, 'flush' );
}
/**